package com.jplus.plugins.rpc.impl.handler.scoket;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jplus.framework.bean.BeanHandle;
import com.jplus.framework.util.FormatUtil;
import com.jplus.framework.util.serializer.SerializationUtils;
import com.jplus.plugins.rpc.Constant;
import com.jplus.plugins.rpc.bean.KeyNode;
import com.jplus.plugins.rpc.bean.RpcRequest;
import com.jplus.plugins.rpc.bean.RpcResponse;
import com.jplus.plugins.rpc.bean.Service;
import com.jplus.plugins.rpc.bean.ZNode;
import com.jplus.plugins.rpc.impl.iface.IAgreement;
import com.jplus.plugins.rpc.impl.iface.IWatcher;

/**
 * Scoket
 * 
 * @author Yuanqy
 *
 */
public class ScoketService implements IAgreement {

	private Logger logger = LoggerFactory.getLogger(ScoketService.class);

	@Override
	public void opServer(IWatcher watcher, final Map<String, Service> handler) throws Exception {
		final int port = Constant.SCOKET_PORT;
		logger.debug("Export RPC server on port " + port);
		@SuppressWarnings("resource")
		final ServerSocket scoketServer = new ServerSocket(port);
		watcher.process(true);// 通知同步
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					logger.info("RPC [scoket] server started at port: {}", port);
					while (true) {
						try {
							final Socket socket = scoketServer.accept();// 瓶颈所在
							socket.setSoTimeout(Constant.RPC_SERVER_TIMEOUT);
							new Thread(new Runnable() {
								@Override
								public void run() {
									try {
										logger.debug("The host [" + socket.getInetAddress().getHostAddress() + "] is client");
										ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
										try {
											RpcRequest request = (RpcRequest) SerializationUtils.deserialize((byte[]) input.readObject());
											Object result = null;
											ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
											Service impl = handler.get(new KeyNode(request).MD5());
											Object serverImpl = BeanHandle.getBean(impl.getImpl());
											try {
												if (serverImpl == null) {
													throw new NullPointerException("Don't find provider[" + request.getClass()
															+ "],Please check if the providers have been started and registered.");
												} else {
													logger.info(">>>Coming a RPC request[{}]:{},{}", "Scoket", impl.getImpl().getName(),
															request.getMethodName());
													Method method = serverImpl.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
													result = method.invoke(serverImpl, request.getParameters());
													output.writeObject(SerializationUtils.serialize(result));
												}
											} catch (Throwable t) {
												output.writeObject(t);
											} finally {
												output.close();
											}
										} finally {
											logger.debug("The host [" + socket.getInetAddress().getHostAddress() + "] is over");
											input.close();
										}
									} catch (Exception e) {
										try {
											socket.close();
										} catch (IOException e1) {
											e1.printStackTrace();
										}
										e.printStackTrace();
									}
								}
							}).start();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();
	}

	@Override
	public RpcResponse opClient(RpcRequest request, ZNode serNode) throws Exception {
		Socket socket = new Socket(serNode.getHost(), serNode.getPort());
		logger.debug("RPC reqeust:{} from server {}:{}", FormatUtil.toJSONString(request), serNode.getHost(), serNode.getPort());
		socket.setSoTimeout(Constant.RPC_CLIENT_TIMEOUT);
		try {
			ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
			try {
				output.writeObject(SerializationUtils.serialize(request));
				ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
				try {
					Object result = SerializationUtils.deserialize((byte[]) input.readObject());
					RpcResponse response = new RpcResponse();
					response.setRequestId(request.getRequestId());
					if (result instanceof Throwable)
						response.setError((Throwable) result);
					else {
						response.setResult(result);
					}
					return response;
				} catch (SocketTimeoutException ste) {
					logger.error("资源请求超时[" + Constant.RPC_CLIENT_TIMEOUT + "s]:" + FormatUtil.toJSONString(request));
					throw ste;
				} finally {
					input.close();
				}
			} finally {
				output.close();
			}
		} finally {
			socket.close();
		}
	}

}
